#include <iostream>
#include <phpcpp.h>
#include <rocketmq/DefaultMQPushConsumer.h>
#include <rocketmq/MessageListener.h>
#include <rocketmq/Message.h>
#include <rocketmq/MessageExt.h>
#include <rocketmq/MessageQueue.h>
#include <rocketmq/PullResult.h>
#include <rocketmq/MQClientException.h>

#include <PhpSendResult.h>
#include <producer.h>
#include <transactionProducer.h>

class PhpMessage : public Php::Base {
public:
    PhpMessage(MessageExt* messageExt) {
        pMessageExt = messageExt;
    }

    Php::Value getMsgId() {
        return pMessageExt->getMsgId();
    }

    Php::Value getTopic() {
        return pMessageExt->getTopic();
    }

    Php::Value getTags() {
        return pMessageExt->getTags();
    }

    Php::Value getKeys() {
        return pMessageExt->getKeys();
    }

    Php::Value getBodyLen() {
        return pMessageExt->getBodyLen();
    }

    Php::Value getQueueId() {
        return pMessageExt->getQueueId();
    }

    Php::Value getReconsumeTimes() {
        return pMessageExt->getReconsumeTimes();
    }

    Php::Value getCommitLogOffset() {
        return (int64_t)pMessageExt->getCommitLogOffset();
    }

    Php::Value getBornTimestamp() {
        return (int64_t)pMessageExt->getBornTimestamp();
    }

    Php::Value getStoreTimestamp() {
        return (int64_t)pMessageExt->getStoreTimestamp();
    }

    Php::Value getPreparedTransactionOffset() {
        return (int64_t)pMessageExt->getPreparedTransactionOffset();
    }

    Php::Value getProperty(Php::Parameters& params) {
        if (params.size() < 1) {
            throw Php::Exception("Incorrect number of parameters assigned.");
        }

        return pMessageExt->getProperty(params[0].stringValue());
    }

    /**
     * return value type: char*
     */
    Php::Value getBody() {
        std::string body(pMessageExt->getBody(), pMessageExt->getBodyLen());
        
        return body;
    }

private:
    MessageExt* pMessageExt;
};

class PhpMessageListener : public MessageListenerConcurrently {
private:
    const Php::Value& _callback;

public:
    PhpMessageListener(const Php::Value& callback) : _callback(callback) {
    }

    virtual ConsumeConcurrentlyStatus consumeMessage(std::list<MessageExt *> &msgs,
                                                     ConsumeConcurrentlyContext &context);
};

class PushConsumer : public Php::Base {
public:
    PushConsumer() {
        consumer = new DefaultMQPushConsumer();
        consumer->setConsumeThreadMax(1);
        consumer->setConsumeThreadMin(1);
    }

    virtual ~PushConsumer() {
        delete(messageListener);
        delete(consumer);
    }

    void __destruct() {
        consumer->shutdown();
    }

    void start() {
        consumer->start();
    }

    void shutdown() {
        consumer->shutdown();
    }

    void setConsumerGroup(Php::Parameters& params) {
        if (params.size() < 1) {
            throw Php::Exception("Incorrect number of parameters assigned.");
        }

        consumer->setConsumerGroup(params[0].stringValue());
    }

    void subscribe(Php::Parameters& params) {
        if (params.size() != 3) {
            throw Php::Exception("Incorrect number of parameters passed in");
        }
        consumer->subscribe(params[0].stringValue(), params[1].stringValue());
        Php::Value* value = new Php::Value(params[2]);
        messageListener = new PhpMessageListener(*value);

        //MsgListener* listener = new MsgListener();
        consumer->registerMessageListener(messageListener);
    }

private:
    DefaultMQPushConsumer* consumer;
    MessageListener* messageListener;
    Php::Value* callback;
};


/**
 *  tell the compiler that the get_module is a pure C function
 */
extern "C" {

/**
 *  Function that is called by PHP right after the PHP process
 *  has started, and that returns an address of an internal PHP
 *  strucure with all the details and features of your extension
 *
 *  @return void*   a pointer to an address that is understood by PHP
 */
PHPCPP_EXPORT void *get_module()
{
    // static(!) Php::Extension object that should stay in memory
    // for the entire duration of the process (that's why it's static)
    static Php::Extension extension("rocketmqclient4php", "1.0");
    
    // Message class
    Php::Class<PhpMessage> phpMessage("PhpMessage");
    phpMessage.method("getMsgId", &PhpMessage::getMsgId, {});
    phpMessage.method("getTopic", &PhpMessage::getTopic, {});
    phpMessage.method("getTags", &PhpMessage::getTags, {});
    phpMessage.method("getKeys", &PhpMessage::getKeys, {});
    phpMessage.method("getBodyLen", &PhpMessage::getBodyLen, {});
    phpMessage.method("getBody", &PhpMessage::getBody, {});
    phpMessage.method("getQueueId", &PhpMessage::getQueueId, {});
    phpMessage.method("getReconsumeTimes", &PhpMessage::getReconsumeTimes, {});
    phpMessage.method("getCommitLogOffset", &PhpMessage::getCommitLogOffset, {});
    phpMessage.method("getBornTimestamp", &PhpMessage::getBornTimestamp, {});
    phpMessage.method("getStoreTimestamp", &PhpMessage::getStoreTimestamp, {});
    phpMessage.method("getPreparedTransactionOffset", &PhpMessage::getPreparedTransactionOffset, {});
    phpMessage.method("getProperty", &PhpMessage::getProperty, 
                            {Php::ByVal("name", Php::Type::String)});
    extension.add(std::move(phpMessage));

    // SendResult
    Php::Class<PhpSendResult> phpSendResult("PhpSendResult");
    phpSendResult.method("getMsgId", &PhpSendResult::getMsgId, {});
    phpSendResult.method("getUniqKey", &PhpSendResult::getUniqKey, {});
    extension.add(std::move(phpSendResult));

    // Consumer class
    Php::Class<PushConsumer> pushConsumer("PushConsumer");
    pushConsumer.method("start", &PushConsumer::start, {});
    pushConsumer.method("shutdown", &PushConsumer::shutdown, {});
    pushConsumer.method("__destruct", &PushConsumer::__destruct, {});
    pushConsumer.method("setConsumerGroup", &PushConsumer::setConsumerGroup,
                           {Php::ByVal("consumerGroup", Php::Type::String)});
    pushConsumer.method("subscribe", &PushConsumer::subscribe,
                           {Php::ByVal("topic", Php::Type::String),
                            Php::ByVal("tags", Php::Type::String),
                            Php::ByVal("consumeFunction", Php::Type::Callable)});

    // Producer class
    Php::Class<Producer> producer("Producer");
    producer.method("__construct", &Producer::__construct, {});
    producer.method("__destruct", &Producer::__destruct, {});
    producer.method("send", &Producer::send, {
                            Php::ByVal("topic", Php::Type::String),
                            Php::ByVal("tag", Php::Type::String),
                            Php::ByVal("key", Php::Type::String),
                            Php::ByVal("content", Php::Type::String)});
    producer.method("start", &Producer::start, {});
    producer.method("setProducerGroup", &Producer::setProducerGroup, {
                            Php::ByVal("groupName", Php::Type::String)});

    // TransactionProducer
    Php::Class<TransactionProducer> transactionProducer("TransactionProducer");
    transactionProducer.method("__construct", &TransactionProducer::__construct, {});
    transactionProducer.method("__destruct", &TransactionProducer::__destruct, {});
    transactionProducer.method("sendMessageInTransaction", &TransactionProducer::sendMessageInTransaction, {
                            Php::ByVal("topic", Php::Type::String),
                            Php::ByVal("tag", Php::Type::String),
                            Php::ByVal("key", Php::Type::String),
                            Php::ByVal("content", Php::Type::String)});
    transactionProducer.method("endTransaction", &TransactionProducer::endTransaction, {
                            Php::ByVal("msgId", Php::Type::String),
                            Php::ByVal("transState", Php::Type::String)});
    transactionProducer.method("start", &TransactionProducer::start, {});
    transactionProducer.method("setProducerGroup", &TransactionProducer::setProducerGroup, {
                            Php::ByVal("groupName", Php::Type::String)});

    // use namespace
    Php::Namespace Rocketmq("Rocketmq");
    Rocketmq.add(std::move(producer));
    Rocketmq.add(std::move(transactionProducer));
    Rocketmq.add(std::move(pushConsumer));

    extension.add(std::move(Rocketmq));

    // return the extension
    return extension;
}
}

ConsumeConcurrentlyStatus PhpMessageListener::consumeMessage(std::list<MessageExt *> &msgs,
                                                             ConsumeConcurrentlyContext &context) {
    //std::cout << "batch size: " << msgs.size() << std::endl;
    if(msgs.empty()) {
       throw Php::Exception("batch consuming message list is empty");
    }

    MessageExt* messageExt = msgs.front();

    //std::cout << "getBody-0: " << messageExt->getBody() << std::endl;
    //std::cout << "Begin to consume message. msgId: " << messageExt->getMsgId() << std::endl;
    //std::cout << "Test if callable: " << (_callback.isCallable() ? " true" : "false") << std::endl;
    if (!_callback.isCallable()) {
        throw Php::Exception("Callback PHP function is expected");
    }

    Php::Value param = Php::Object("PhpMessage", new PhpMessage(messageExt));
    Php::Value value;
    try {
        value = _callback(param);
    } catch (...) {
        std::cout << "Yuck! Bussiness code is buggy!" << std::endl;
    }

    if (!value.isNull() && value.isNumeric() && value.numericValue() > 0) {
        std::cout << "Message Consumption Failed! Retry Later." << std::endl;
        context.ackIndex = 0;
        return RECONSUME_LATER;
    }

    //std::cout << "Message Consumed OK" << std::endl;
    context.ackIndex = 1;
    return CONSUME_SUCCESS;
}
